#!/usr/bin/python3 -cimport os, sys; os.execv(os.path.dirname(sys.argv[1]) + "/../common/pywrap", sys.argv)

# This file is part of Cockpit.
#
# Copyright (C) 2015 Red Hat, Inc.
#
# Cockpit is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# Cockpit is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Cockpit; If not, see <https://www.gnu.org/licenses/>.

import json
import subprocess

import packagelib
import storagelib
import testlib


def console_screenshot(machine, name):
    subprocess.run("virsh -c qemu:///session screenshot %s '%s'" % (str(machine._domain.ID()), name),
                   shell=True)
    testlib.attach(name, move=True)
    print("Wrote screenshot to " + name)


@testlib.nondestructive
@testlib.skipImage("cryptsetup uses too much memory, OOM on our test VMs", "rhel-8-*")
class TestStorageLuks(storagelib.StorageCase):

    def testLuks(self):
        self.allow_journal_messages("Device is not initialized.*", ".*could not be opened.")
        m = self.machine
        b = self.browser

        mount_point_secret = "/run/secret"

        self.login_and_go("/storage")

        # Add a disk and partition it
        disk = self.add_ram_disk(100)
        self.click_card_row("Storage", name=disk)
        self.click_card_dropdown("Solid State Drive", "Create partition table")
        self.dialog({"type": "gpt"})
        b.wait_text(self.card_row_col("GPT partitions", 1, 1), "Free space")

        self.assertEqual(m.execute("grep -v ^# /etc/crypttab || true").strip(), "")

        # Format it with luks
        self.click_dropdown(self.card_row("GPT partitions", 1), "Create partition")
        self.dialog_wait_open()
        self.dialog_set_val("size", 60)
        self.dialog_set_val("type", "ext4")
        self.dialog_set_val("crypto", self.default_crypto_type)
        self.dialog_set_val("name", "ENCRYPTED")
        self.dialog_set_val("passphrase", "vainu-reku-toma-rolle-kaja")
        self.dialog_set_val("passphrase2", "vainu-reku-toma-rolle-kaja")
        self.dialog_set_val("store_passphrase.on", val=True)
        self.dialog_set_val("crypto_options", "crypto,options")
        self.dialog_set_val("mount_point", mount_point_secret)
        b.assert_pixels("#dialog", "format")
        self.dialog_apply()
        self.dialog_wait_close()
        b.wait_text(self.card_row_col("GPT partitions", 1, 2), "ext4 filesystem (encrypted)")
        b.wait_text(self.card_row_col("GPT partitions", 1, 3), mount_point_secret)

        dev = "/dev/" + b.text(self.card_row_col("GPT partitions", 1, 1))
        self.click_card_row("GPT partitions", 1)

        if self.default_crypto_type == "luks1":
            b.wait_text(self.card_desc("Encryption", "Encryption type"), "LUKS1")
        elif self.default_crypto_type == "luks2":
            b.wait_text(self.card_desc("Encryption", "Encryption type"), "LUKS2")

        uuid = m.execute(f"cryptsetup luksUUID {dev}").strip()
        cleartext_dev = "/dev/mapper/luks-" + uuid
        passphrase_path = "/etc/luks-keys/luks-" + uuid

        self.assert_in_configuration(dev, "crypttab", "options", "crypto,options")
        self.assert_not_in_configuration(dev, "crypttab", "options", "noauto")
        self.assert_in_configuration(dev, "crypttab", "passphrase-path", passphrase_path)
        self.assertEqual(m.execute(f"cat {passphrase_path}"), "vainu-reku-toma-rolle-kaja")

        self.assert_in_configuration(cleartext_dev, "fstab", "dir", mount_point_secret)
        self.assert_not_in_configuration(cleartext_dev, "fstab", "opts", "noauto")

        # cut off minutes, to avoid a too wide race condition in the test
        date_no_mins = b.eval_js("Intl.DateTimeFormat('en', { dateStyle: 'medium', timeStyle: 'short' }).format()").split(':')[0]
        b.wait_in_text(self.card_desc("Encryption", "Stored passphrase"),
                       f"Last modified: {date_no_mins}:")

        b.assert_pixels(self.card("Encryption"), "card",
                        ignore=["dt:contains(Stored passphrase) + dd",
                                "dt:contains(Cleartext device) + dd"])

        # Unmount. This locks it
        b.click(self.card_button("ext4 filesystem", "Unmount"))
        self.confirm()
        self.assert_in_configuration(dev, "crypttab", "options", "noauto")
        self.assert_in_child_configuration(dev, "fstab", "opts", "noauto")
        self.wait_not_mounted("Filesystem")

        # Mount, this uses the stored passphrase for unlocking
        b.click(self.card_button("Filesystem", "Mount"))
        self.dialog_wait_open()
        self.dialog_wait_val("mount_point", mount_point_secret)
        self.dialog_apply()
        self.dialog_wait_close()
        self.assert_not_in_configuration(dev, "crypttab", "options", "noauto")
        self.assert_not_in_configuration(cleartext_dev, "fstab", "opts", "noauto")
        self.wait_mounted("ext4 filesystem")

        b.click(self.card_desc_action("Encryption", "Options"))
        self.dialog({"options": "weird,options"})
        self.assert_in_configuration(dev, "crypttab", "options", "weird,options")

        # Change stored passphrase
        b.click(self.card_desc_action("Encryption", "Stored passphrase"))
        self.dialog({"passphrase": "wrong-passphrase"})
        self.assert_in_configuration(dev, "crypttab", "passphrase-path", passphrase_path)
        self.assertEqual(m.execute(f"cat {passphrase_path}"), "wrong-passphrase")

        # Unmount it
        b.click(self.card_button("ext4 filesystem", "Unmount"))
        self.confirm()
        self.assert_in_configuration(dev, "crypttab", "options", "noauto")
        self.assert_in_child_configuration(dev, "fstab", "opts", "noauto")
        self.wait_not_mounted("Filesystem")

        # Mount, this tries the wrong passphrase but eventually prompts.
        b.click(self.card_button("Filesystem", "Mount"))
        self.dialog_wait_open()
        self.dialog_apply()
        b.wait_in_text("#dialog", "Failed to activate device:")
        self.dialog_set_val("passphrase", "vainu-reku-toma-rolle-kaja")
        self.dialog_apply()
        self.dialog_wait_close()
        self.assert_not_in_configuration(dev, "crypttab", "options", "noauto")
        self.assert_not_in_configuration(cleartext_dev, "fstab", "opts", "noauto")
        self.wait_mounted("ext4 filesystem")

        # Remove passphrase
        b.click(self.card_desc_action("Encryption", "Stored passphrase"))
        self.dialog({"passphrase": ""})
        self.assert_in_configuration(dev, "crypttab", "passphrase-path", "")

        # Unmount it
        b.click(self.card_button("ext4 filesystem", "Unmount"))
        self.confirm()
        self.assert_in_configuration(dev, "crypttab", "options", "noauto")
        self.assert_in_child_configuration(dev, "fstab", "opts", "noauto")
        self.wait_not_mounted("Filesystem")

        # Mount it readonly.  This asks for a passphrase.
        b.click(self.card_button("Filesystem", "Mount"))
        self.dialog({"mount_options.ro": True,
                     "passphrase": "vainu-reku-toma-rolle-kaja"})
        self.wait_mounted("ext4 filesystem")
        self.assertIn("ro", m.execute(f"findmnt -s -n -o OPTIONS {mount_point_secret}"))
        self.assert_in_configuration(dev, "crypttab", "options", "readonly")
        self.assert_in_configuration(cleartext_dev, "fstab", "opts", "ro")

        # Check that the clear text device is actually readonly
        self.assertEqual(m.execute(f"lsblk -no RO {cleartext_dev}").strip(), "1")

        # Change "read only" mount option. This should reopen LUKS as
        # necessary and ask for a passphrase.
        b.click(self.card_desc("ext4 filesystem", "Mount point") + " button")
        self.dialog_wait_open()
        self.dialog_set_val("mount_options.ro", val=False)
        self.dialog_set_val("passphrase", "vainu-reku-toma-rolle-kaja")
        self.dialog_apply()
        self.dialog_wait_close()

        self.assertEqual(m.execute(f"lsblk -no RO {cleartext_dev}").strip(), "0")

        # Delete the partition.
        self.click_card_dropdown("Partition", "Delete")
        self.confirm()
        b.wait_text(self.card_row_col("GPT partitions", 1, 1), "Free space")
        # luksmeta-monitor-hack.py races with the partition deletion
        self.allow_journal_messages('Unknown device .*: No such file or directory')

        self.assertEqual(m.execute("grep -v ^# /etc/crypttab || true").strip(), "")
        self.assertEqual(m.execute(f"grep {mount_point_secret} /etc/fstab || true"), "")

        # luksmeta-monitor-hack.py might leave a udevadm process
        # behind, so let's check that the session goes away cleanly
        # after a logout.

        b.logout()
        testlib.wait(lambda: m.execute("(loginctl list-users | grep admin) || true") == "")

        # FIXME: race condition after unmounting; hard to investigate, re-check after storage redesign
        self.allow_browser_errors("validateDOMNesting.*cannot appear as a child.*<tr> ul")

    def testFormatReadOnly(self):
        m = self.machine
        b = self.browser

        self.login_and_go("/storage")

        disk = self.add_ram_disk(400)
        self.click_card_row("Storage", name=disk)

        # Create a encrypted filesystem and mount it readonly. This
        # needs to reopen the LUKS layer after formatting.

        self.click_card_dropdown("Unformatted data", "Format")
        self.dialog({"type": "ext4",
                     "mount_point": f"{self.mnt_dir}/foo",
                     "mount_options.ro": True,
                     "crypto": "luks1",
                     "passphrase": "vainu-reku-toma-rolle-kaja",
                     "passphrase2": "vainu-reku-toma-rolle-kaja"})
        b.wait_visible(self.card("ext4 filesystem"))

        uuid = m.execute(f"cryptsetup luksUUID {disk}").strip()
        cleartext_dev = "/dev/mapper/luks-" + uuid

        self.assert_in_configuration(disk, "crypttab", "options", "readonly")
        self.assert_in_configuration(cleartext_dev, "fstab", "opts", "ro")
        self.assertEqual(m.execute(f"lsblk -no RO {cleartext_dev}").strip(), "1")

        # Reformat it, while keeping the encryption layer and keeping
        # it read-only.  This needs to reopen LUKS twice, once to make
        # the cleartext device writable for formatting, and once to
        # make it read-only again, as requested.

        self.click_card_dropdown("ext4 filesystem", "Format")
        self.dialog_wait_open()
        self.dialog_wait_val("mount_point", f"{self.mnt_dir}/foo")
        self.dialog_wait_val("type", "ext4")
        self.dialog_wait_val("crypto", " keep")
        self.dialog_wait_val("crypto_options", "")
        b.wait_visible(self.dialog_field("mount_options.ro") + ":checked")
        self.dialog_set_val("type", "xfs")
        self.dialog_set_val("old_passphrase", "vainu-reku-toma-rolle-kaja")
        self.dialog_apply()
        self.dialog_wait_close()
        b.wait_visible(self.card("xfs filesystem"))

        self.assertEqual(m.execute(f"lsblk -no RO {cleartext_dev}").strip(), "1")

    def testNoFsys(self):
        m = self.machine
        b = self.browser

        self.login_and_go("/storage")

        # Add a disk and format it with luks, but without filesystem
        disk = self.add_ram_disk()
        self.click_card_row("Storage", name=disk)

        self.click_card_dropdown("Unformatted data", "Format")
        self.dialog({"type": "empty",
                     "crypto": self.default_crypto_type,
                     "passphrase": "vainu-reku-toma-rolle-kaja",
                     "passphrase2": "vainu-reku-toma-rolle-kaja"})
        b.wait_visible(self.card("Unformatted data"))

        # Lock it
        b.click(self.card_button("Unformatted data", "Lock"))
        self.confirm()
        b.wait_visible(self.card("Locked data"))

        # Make it readonly

        # The locking above changes crypttab asynchronously, and there
        # is no indication in the UI when cockpit has caught up with
        # that. So we retry the dialog when it looks like Cockpit is
        # out of synch...

        self.dialog_with_error_retry(trigger=lambda: b.click(self.card_desc_action("Encryption", "Options")),
                                     values={"options": "readonly"},
                                     errors=["Didn't find entry to remove"])
        self.assertNotEqual(m.execute("grep readonly /etc/crypttab"), "")

        # Unlock it
        b.click(self.card_button("Locked data", "Unlock"))
        self.dialog({"passphrase": "vainu-reku-toma-rolle-kaja"})

        # Should be unlocked readonly
        uuid = m.execute(f"cryptsetup luksUUID {disk}").strip()
        cleartext_dev = "/dev/mapper/luks-" + uuid
        self.assertEqual(m.execute(f"lsblk -no RO {cleartext_dev}").strip(), "1")

        # Now create a empty, encrypted partition
        self.click_card_dropdown("Solid State Drive", "Create partition table")
        self.dialog({"type": "gpt"})
        b.wait_text(self.card_row_col("GPT partitions", 1, 1), "Free space")
        self.click_dropdown(self.card_row("GPT partitions", 1), "Create partition")
        self.dialog({"type": "empty",
                     "crypto": self.default_crypto_type,
                     "passphrase": "vainu-reku-toma-rolle-kaja",
                     "passphrase2": "vainu-reku-toma-rolle-kaja"})
        b.wait_text(self.card_row_col("GPT partitions", 1, 2), "Unformatted data (encrypted)")

    def testKeepKeys(self):
        b = self.browser

        self.login_and_go("/storage")

        # Add a disk and format it with luks and a filesystem

        disk = self.add_ram_disk()
        self.click_card_row("Storage", name=disk)

        self.click_card_dropdown("Unformatted data", "Format")
        self.dialog({"type": "ext4",
                     "crypto": self.default_crypto_type,
                     "passphrase": "vainu-reku-toma-rolle-kaja",
                     "passphrase2": "vainu-reku-toma-rolle-kaja",
                     "mount_point": f"{self.mnt_dir}/foo"})
        b.wait_visible(self.card("ext4 filesystem"))

        # Format it again but keep the keys

        self.click_card_dropdown("ext4 filesystem", "Format")
        self.dialog({"type": "ext4",
                     "crypto": " keep",
                     "mount_point": f"{self.mnt_dir}/foo"})
        b.wait_visible(self.card("ext4 filesystem"))

        # Unmount (and lock) it

        b.click(self.card_button("ext4 filesystem", "Unmount"))
        self.confirm()
        self.wait_not_mounted("Filesystem")

        # Format it again and keep the keys.  Because it's locked, we
        # need the old passphrase.

        self.click_card_dropdown("Filesystem", "Format")
        self.dialog({"type": "ext4",
                     "crypto": " keep",
                     "old_passphrase": "vainu-reku-toma-rolle-kaja",
                     "mount_point": f"{self.mnt_dir}/foo"})
        self.wait_mounted("ext4 filesystem")


class TestStorageLuksDestructive(storagelib.StorageCase):

    # LUKS uses memory hard PBKDF, 1 GiB is not enough; see https://bugzilla.redhat.com/show_bug.cgi?id=1881829
    provision = {
        "0": {"memory_mb": 1536}
    }

    def testLuks1Slots(self):
        self.allow_journal_messages("Device is not initialized.*", ".*could not be opened.")
        m = self.machine
        b = self.browser

        # This should work without any of the Clevis stuff.
        m.execute("rm -f /usr/bin/luksmeta /usr/bin/clevis*")

        mount_point_secret = "/run/secret"

        error_base = "Error unlocking /dev/sda: Failed to activate device: "
        error_messages = [error_base + "Operation not permitted",
                          error_base + "Incorrect passphrase."]

        self.login_and_go("/storage")

        m.add_disk("50M", serial="MYDISK")
        dev = "/dev/sda"
        self.click_card_row("Storage", name=dev)
        # create volume and passphrase
        self.click_card_dropdown("Unformatted data", "Format")
        self.dialog({"type": "ext4",
                     "crypto": "luks1",
                     "name": "ENCRYPTED",
                     "mount_point": mount_point_secret,
                     "passphrase": "vainu-reku-toma-rolle-kaja",
                     "passphrase2": "vainu-reku-toma-rolle-kaja"},
                    secondary=True)
        self.wait_not_mounted("Filesystem")
        b.wait_text(self.card_desc("Encryption", "Encryption type"), "LUKS1")
        b.wait_in_text(self.card_desc("Encryption", "Options"), "nofail")

        uuid = m.execute(f"cryptsetup luksUUID {dev}").strip()
        cleartext_dev = "/dev/mapper/luks-" + uuid

        # add one more passphrase
        panel = "#encryption-keys "
        b.wait_visible(panel)
        b.click(self.card("Encryption") + " [aria-label='Add']")
        self.dialog_wait_open()
        self.dialog_wait_apply_enabled()
        self.dialog_set_val("new_passphrase", "vainu-reku-toma-rolle-kaja-1")
        self.dialog_set_val("new_passphrase2", "vainu-reku-toma-rolle-kaja-1")
        self.dialog_set_val("passphrase", "vainu-reku-toma-rolle-kaja")
        self.dialog_apply()
        self.dialog_wait_close()
        # unlock with first passphrase
        b.click(self.card_button("Filesystem", "Mount"))
        self.dialog({"passphrase": "vainu-reku-toma-rolle-kaja"})
        self.wait_mounted("ext4 filesystem")
        self.assert_not_in_configuration(dev, "crypttab", "options", "noauto")
        self.assert_not_in_configuration(cleartext_dev, "fstab", "opts", "noauto")
        # unlock with second passphrase
        b.click(self.card_button("ext4 filesystem", "Unmount"))
        self.confirm()
        self.assert_in_configuration(dev, "crypttab", "options", "noauto")
        self.assert_in_child_configuration(dev, "fstab", "opts", "noauto")
        self.wait_not_mounted("Filesystem")
        b.click(self.card_button("Filesystem", "Mount"))
        self.dialog({"passphrase": "vainu-reku-toma-rolle-kaja-1"})
        self.assert_not_in_configuration(dev, "crypttab", "options", "noauto")
        self.assert_not_in_configuration(cleartext_dev, "fstab", "opts", "noauto")
        self.wait_mounted("ext4 filesystem")
        # delete second key slot
        b.click(panel + "tr:nth-child(2) button[aria-label='Remove']")
        # do not accept the same passphrase
        b.set_input_text("#remove-passphrase", "vainu-reku-toma-rolle-kaja-1")
        b.click("button:contains('Remove')")
        b.wait_in_text(".pf-v6-c-alert__title", "No key available with this passphrase.")
        # delete with passphrase from slot 0
        b.set_input_text("#remove-passphrase", "vainu-reku-toma-rolle-kaja")
        b.click("button:contains('Remove')")
        with b.wait_timeout(30):
            b.wait_not_present("#remove-passphrase")
        # check that it is not possible to unlock with deleted passphrase
        b.click(self.card_button("ext4 filesystem", "Unmount"))
        self.confirm()
        self.assert_in_configuration(dev, "crypttab", "options", "noauto")
        self.assert_in_child_configuration(dev, "fstab", "opts", "noauto")
        self.wait_not_mounted("Filesystem")
        b.click(self.card_button("Filesystem", "Mount"))
        self.dialog_wait_open()
        self.dialog_set_val("passphrase", "vainu-reku-toma-rolle-kaja-1")
        self.dialog_apply()
        b.wait_visible(".pf-v6-c-alert")
        self.assertIn(b.text("h4.pf-v6-c-alert__title:not(span)").split("Danger alert:", 1).pop(), error_messages)
        self.dialog_cancel()

        # add more passphrases, seven exactly, to reach the limit of eight for LUKSv1
        for i in range(1, 8):
            b.click(self.card("Encryption") + " [aria-label='Add']")
            self.dialog_wait_open()
            self.dialog_wait_apply_enabled()
            self.dialog_set_val("new_passphrase", f"vainu-reku-toma-rolle-kaja-{i}")
            self.dialog_set_val("new_passphrase2", f"vainu-reku-toma-rolle-kaja-{i}")
            self.dialog_set_val("passphrase", "vainu-reku-toma-rolle-kaja")
            self.dialog_apply()
            self.dialog_wait_close()

        # check if add button is inactive
        b.wait_visible(self.card("Encryption") + " [aria-label='Add']:disabled")
        # check if edit button is inactive
        slots_row = panel + " tr:first-child"
        b.wait_visible(slots_row + " button:disabled")

        # remove one slot
        slots_list = panel
        b.wait_visible(slots_list + " td:contains('Slot 7')")
        b.click(slots_list + "tr:last-child button[aria-label='Remove']")
        b.set_input_text("#remove-passphrase", "vainu-reku-toma-rolle-kaja-6")
        b.click("button:contains('Remove')")
        with b.wait_timeout(30):
            b.wait_not_present("#remove-passphrase")
        # check if buttons have become enabled after removing last slot
        b.wait_not_present(slots_list + ":disabled")
        b.wait_not_present(panel + ":disabled")
        # remove slot 0, with the original passphrase
        b.click(slots_list + "tr:nth-child(1) button[aria-label='Remove']")
        b.click("#force-remove-passphrase")
        b.click("button:contains('Remove')")
        with b.wait_timeout(30):
            b.wait_not_present("#remove-passphrase")
            # check that it is not possible to unlock with deleted passphrase
            b.click(self.card_button("Filesystem", "Mount"))
            self.dialog_wait_open()
            self.dialog_set_val("passphrase", "vainu-reku-toma-rolle-kaja")
            self.dialog_apply()
            b.wait_visible(".pf-v6-c-alert")
            self.assertIn(b.text("h4.pf-v6-c-alert__title:not(span)").split("Danger alert:", 1).pop(), error_messages)
            self.dialog_cancel()
            # change one of the passphrases
            b.click(slots_list + "tr:last-child [aria-label='Edit']")
            self.dialog_wait_open()
            self.dialog_wait_apply_enabled()
            self.dialog_set_val("old_passphrase", "vainu-reku-toma-rolle-kaja-6")
            self.dialog_set_val("new_passphrase", "vainu-reku-toma-rolle-kaja-8")
            self.dialog_set_val("new_passphrase2", "vainu-reku-toma-rolle-kaja-8")
            self.dialog_apply()
            self.dialog_wait_close()
            # unlock volume with the negwly created passphrase
            b.click(self.card_button("Filesystem", "Mount"))
            self.dialog({"passphrase": "vainu-reku-toma-rolle-kaja-8"})
            self.wait_mounted("ext4 filesystem")
            self.assert_not_in_configuration(dev, "crypttab", "options", "noauto")
            self.assert_not_in_configuration(cleartext_dev, "fstab", "opts", "noauto")

    def testReboot(self):
        m = self.machine
        b = self.browser

        self.login_and_go("/storage")

        # Add a disk and format it with luks and a filesystem, then
        # reboot and check that the filesystem is mounted.

        dev = "/dev/sda"
        m.add_disk("50M", serial="MYDISK")
        self.click_card_row("Storage", name=dev)

        self.click_card_dropdown("Unformatted data", "Format")
        self.dialog({"type": "ext4",
                     "crypto": self.default_crypto_type,
                     "passphrase": "vainu-reku-toma-rolle-kaja",
                     "passphrase2": "vainu-reku-toma-rolle-kaja",
                     "mount_point": f"{self.mnt_dir}/foo"})
        self.wait_mounted("ext4 filesystem")

        self.setup_systemd_password_agent("vainu-reku-toma-rolle-kaja")
        self.reboot()
        m.start_cockpit()
        b.relogin()
        b.enter_page("/storage")

        self.wait_mounted("ext4 filesystem")


@testlib.skipImage("clevis/tang not installed", "*-bootc")
class TestStorageNBDE(storagelib.StorageCase, packagelib.PackageCase):
    provision = {
        "0": {"address": "10.111.112.1/20", "memory_mb": 2048},
        "tang": {"address": "10.111.112.5/20"}
    }

    def testBasic(self):
        m = self.machine
        b = self.browser

        # Only Arch gets it right...
        need_fixing = (m.image != "arch")

        mount_point_secret = "/run/secret"

        tang_m = self.machines["tang"]
        tang_m.execute("systemctl start tangd.socket")
        tang_m.execute("firewall-cmd --add-port 80/tcp")

        if need_fixing:
            self.addPackageSet("clevis")
            self.enableRepo()

        self.login_and_go("/storage")

        # Add a disk and format it with luks
        dev = "/dev/sda"
        m.add_disk("50M", serial="MYDISK")
        self.click_card_row("Storage", name=dev)

        self.click_card_dropdown("Unformatted data", "Format")
        self.dialog({"type": "ext4",
                     "crypto": self.default_crypto_type,
                     "name": "ENCRYPTED",
                     "mount_point": mount_point_secret,
                     "passphrase": "vainu-reku-toma-rolle-kaja",
                     "passphrase2": "vainu-reku-toma-rolle-kaja"},
                    secondary=True)
        self.wait_not_mounted("Filesystem")

        b.wait_in_text(self.card_desc("Encryption", "Options"), "nofail")
        panel = "#encryption-keys "
        b.wait_in_text(panel + "tr:nth-child(1)", "Passphrase")

        # Add a key
        #
        b.click(self.card("Encryption") + " [aria-label='Add']")
        self.dialog_wait_open()
        self.dialog_wait_apply_enabled()
        self.dialog_set_val("type", "tang")
        self.dialog_set_val("tang_url", "10.111.112.5")
        self.dialog_set_val("passphrase", "wrong-passphrase")
        self.dialog_apply()
        b.wait_in_text("#dialog", "Check the key hash with the Tang server")
        b.wait_in_text("#dialog", tang_m.execute("tang-show-keys").strip())
        self.dialog_apply()
        with b.wait_timeout(60):
            if need_fixing:
                b.wait_in_text("#dialog", "Add Network Bound Disk Encryption")
                b.wait_in_text("#dialog", "The clevis-systemd package must be installed")
                self.dialog_apply()
            b.wait_in_text("#dialog", "No key available with this passphrase.")
            self.dialog_set_val("passphrase", "vainu-reku-toma-rolle-kaja")
            self.dialog_apply()
            self.dialog_wait_close()
        b.wait_in_text(panel + "tr:nth-child(2)", "10.111.112.5")

        # Adding the key should add "_netdev" options
        #
        b.wait_in_text(self.card_desc("Filesystem", "Mount point"), "after network")
        b.wait_in_text(self.card_desc("Encryption", "Options"), "_netdev")

        # Mount it.  This should succeed without passphrase.
        #
        b.click(self.card_button("Filesystem", "Mount"))
        self.dialog_wait_open()
        self.dialog_wait_val("mount_point", mount_point_secret)
        self.dialog_apply()
        self.dialog_wait_close()
        self.wait_mounted("ext4 filesystem")

        # LUKS should be read/write
        uuid = m.execute(f"cryptsetup luksUUID {dev}").strip()
        cleartext_dev = "/dev/mapper/luks-" + uuid
        self.assertEqual(m.execute(f"lsblk -no RO {cleartext_dev}").strip(), "0")

        # Remount "readonly", LUKS should be readonly
        b.click(self.card_button("ext4 filesystem", "Unmount"))
        self.confirm()
        b.click(self.card_button("Filesystem", "Mount"))
        self.dialog_wait_open()
        self.dialog_set_val("mount_options.ro", val=True)
        self.dialog_apply()
        self.dialog_wait_close()
        self.wait_mounted("ext4 filesystem")

        # Now LUKS should be readonly
        self.assertEqual(m.execute(f"lsblk -no RO {cleartext_dev}").strip(), "1")

        # Edit the key, without providing an existing passphrase
        #
        b.click(panel + "tr:nth-child(2) [aria-label='Edit']")
        self.dialog_wait_open()
        self.dialog_wait_apply_enabled()
        self.dialog_wait_val("tang_url", "10.111.112.5")
        self.dialog_set_val("tang_url", "http://10.111.112.5/")
        self.dialog_apply()
        b.wait_in_text("#dialog", "Check the key hash with the Tang server")
        b.wait_in_text("#dialog", tang_m.execute("tang-show-keys").strip())
        self.dialog_apply()
        self.dialog_wait_close()
        b.wait_in_text(panel + "tr:nth-child(2)", "http://10.111.112.5/")

        # Reset the options so that we can check that they get added
        # also when a second key is added.
        #
        b.click(self.card_desc("ext4 filesystem", "Mount point") + " button")
        self.dialog({"at_boot": "nofail"})
        b.wait_in_text(self.card_desc("Encryption", "Options"), "nofail")

        if need_fixing:
            # Break things again, including PackageKit
            m.execute("if type dnf; then dnf remove -y clevis-systemd; else apt-get purge -y clevis-systemd; fi")
            m.execute("systemctl mask --now packagekit")

        # Add a second key, this should try to fix things again, but
        # we have to help with the package install
        #
        b.click(self.card("Encryption") + " [aria-label='Add']")
        self.dialog_wait_open()
        self.dialog_wait_apply_enabled()
        self.dialog_set_val("type", "tang")
        self.dialog_set_val("tang_url", "http://10.111.112.5")
        self.dialog_apply()
        b.wait_in_text("#dialog", "Check the key hash with the Tang server")
        b.wait_in_text("#dialog", tang_m.execute("tang-show-keys").strip())
        self.dialog_apply()
        with b.wait_timeout(60):
            if need_fixing:
                b.wait_in_text("#dialog", "Add Network Bound Disk Encryption")
                b.wait_in_text("#dialog", "The clevis-systemd package must be installed")
                self.dialog_apply()
                b.wait_in_text("#dialog", "PackageKit is not installed")
                self.dialog_cancel()
                self.dialog_wait_close()

                # Manually install the missing package, Cockpit should
                # be happy with that even when PackageKit is not
                # available.  Also disable the unit, for variety.
                # Cockpit will enable it without needing explicit
                # confirmation.
                #
                m.execute("if type dnf; then dnf install -y clevis-systemd; else apt-get install -y clevis-systemd; fi")
                m.execute("systemctl disable --now clevis-luks-askpass.path")

                b.click(self.card("Encryption") + " [aria-label='Add']")
                self.dialog_wait_open()
                self.dialog_wait_apply_enabled()
                self.dialog_set_val("type", "tang")
                self.dialog_set_val("tang_url", "http://10.111.112.5")
                self.dialog_apply()
                b.wait_in_text("#dialog", "Check the key hash with the Tang server")
                b.wait_in_text("#dialog", tang_m.execute("tang-show-keys").strip())
                self.dialog_apply()

            self.dialog_wait_close()

        b.wait_in_text(panel + "tr:nth-child(3)", "http://10.111.112.5")

        # This should bring the options back.
        #
        b.wait_in_text(self.card_desc("ext4 filesystem", "Mount point"), "after network")
        b.wait_in_text(self.card_desc("Encryption", "Options"), "_netdev")

        # Reboot
        #
        self.reboot()
        m.start_cockpit()
        b.relogin()
        b.enter_page("/storage")

        self.wait_mounted("ext4 filesystem")

        # Remove one key on client
        #
        b.click(panel + 'tr:contains("Slot 1") button[aria-label="Remove"]')
        self.confirm()
        b.wait_not_present(panel + 'tr:contains("Slot 1")')

    @testlib.skipImage("TODO: don't know how to encrypt the rootfs", "debian-*", "ubuntu-*", "arch")
    # this doesn't work in ostree either, but we don't run storage tests there
    @testlib.skipWsContainer("newly built root doesn't contain ws container")
    @testlib.timeout(1200)
    def testRootReboot(self):
        m = self.machine
        b = self.browser

        tang_m = self.machines["tang"]
        tang_m.execute("systemctl start tangd.socket")
        tang_m.execute("firewall-cmd --add-port 80/tcp")

        try:
            self.modify_rootfs(passphrase="einszweidrei")
        except Exception:
            console_screenshot(m, "failed-encrypt.ppm")
            raise

        self.assertIn("crypt", m.execute("lsblk -snlo TYPE $(findmnt -no SOURCE /)"))

        self.addPackageSet("clevis")
        self.enableRepo()

        self.login_and_go("/storage")

        # While we are here, let's quickly test that Cockpit warns
        # about renaming the volume group that contains the root
        # filesystem.

        b.go("#/vg/root")
        b.click(self.card_desc_action("LVM2 volume group", "Name"))
        self.dialog_wait_open()
        b.wait_in_text("#dialog", "This volume group contains the root filesystem.")
        b.wait_visible("#dialog button.apply.pf-m-danger")
        self.dialog_cancel()
        self.dialog_wait_close()
        b.go("#/")

        # Add a clevis key and then reboot.
        #
        # We also remove the original passphrase in order to be sure
        # that it was in fact clevis that has unlocked the rootfs, and
        # not the magic provided by "modify_rootfs"

        self.click_card_row("Storage", location="root")

        panel = "#encryption-keys "
        b.wait_in_text(panel + "tr:nth-child(1)", "Passphrase")

        with b.wait_timeout(360):
            b.click(self.card("Encryption") + " [aria-label='Add']")
            self.dialog_wait_open()
            self.dialog_wait_apply_enabled()
            self.dialog_set_val("type", "tang")
            self.dialog_set_val("tang_url", "10.111.112.5")
            self.dialog_set_val("passphrase", "einszweidrei")
            self.dialog_apply()
            b.wait_in_text("#dialog", "Check the key hash with the Tang server")
            b.wait_in_text("#dialog", tang_m.execute("tang-show-keys").strip())
            self.dialog_apply()
            b.wait_in_text("#dialog", "Add Network Bound Disk Encryption")
            b.wait_in_text("#dialog", "The clevis-dracut package must be installed")
            b.wait_in_text("#dialog", "The initrd must be regenerated")
            self.dialog_apply()
            self.dialog_wait_close()

        b.click(panel + "tr:nth-child(1) button[aria-label='Remove']")
        b.click("#force-remove-passphrase")
        b.click("button:contains('Remove')")
        b.wait_in_text(panel + "tr:nth-child(1)", "Keyserver")

        # Tell the initrd to configure our special inter-machine
        # network that has the "tang" machine.
        routes = json.loads(m.execute("ip --json route show 10.111.112.0/20"))
        iface = routes[0]["dev"]
        m.execute(f"grubby --update-kernel=ALL --args='ip=10.111.112.1::10.111.112.1:255.255.255.0::{iface}:off'")

        try:
            self.reboot(timeout_sec=300)
        except Exception:
            console_screenshot(m, "failed-reboot.ppm")
            raise

        m.start_cockpit()
        b.relogin()
        b.enter_page("/storage")

        self.assertIn("crypt", m.execute("lsblk -snlo TYPE $(findmnt -no SOURCE /)"))


if __name__ == '__main__':
    testlib.test_main()
